package com.atguigu.flink.demo01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 *
 * 无界流
 * @author admin
 * @date 2021/8/6
 */
public class UnboundedStreamBySocket {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 监听端口
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);


        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
            // 切割每行数据,并收集到 Collector中
            Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT)).slotSharingGroup("flatMapGroup");


        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatMap.keyBy(0).sum(1);

        sum.print("test");

        env.execute();

    }
}
